parallel.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. """Convenient parallelization of higher order functions.
  2. This module provides two helper functions, with appropriate fallbacks on
  3. Python 2 and on systems lacking support for synchronization mechanisms:
  4. - map_multiprocess
  5. - map_multithread
  6. These helpers work like Python 3's map, with two differences:
  7. - They don't guarantee the order of processing of
  8. the elements of the iterable.
  9. - The underlying process/thread pools chop the iterable into
  10. a number of chunks, so that for very long iterables using
  11. a large value for chunksize can make the job complete much faster
  12. than using the default value of 1.
  13. """
  14. __all__ = ['map_multiprocess', 'map_multithread']
  15. from contextlib import contextmanager
  16. from multiprocessing import Pool as ProcessPool
  17. from multiprocessing.dummy import Pool as ThreadPool
  18. from pip._vendor.requests.adapters import DEFAULT_POOLSIZE
  19. from pip._vendor.six import PY2
  20. from pip._vendor.six.moves import map
  21. from pip._internal.utils.typing import MYPY_CHECK_RUNNING
  22. if MYPY_CHECK_RUNNING:
  23. from typing import Callable, Iterable, Iterator, Union, TypeVar
  24. from multiprocessing import pool
  25. Pool = Union[pool.Pool, pool.ThreadPool]
  26. S = TypeVar('S')
  27. T = TypeVar('T')
  28. # On platforms without sem_open, multiprocessing[.dummy] Pool
  29. # cannot be created.
  30. try:
  31. import multiprocessing.synchronize # noqa
  32. except ImportError:
  33. LACK_SEM_OPEN = True
  34. else:
  35. LACK_SEM_OPEN = False
  36. # Incredibly large timeout to work around bpo-8296 on Python 2.
  37. TIMEOUT = 2000000
  38. @contextmanager
  39. def closing(pool):
  40. # type: (Pool) -> Iterator[Pool]
  41. """Return a context manager making sure the pool closes properly."""
  42. try:
  43. yield pool
  44. finally:
  45. # For Pool.imap*, close and join are needed
  46. # for the returned iterator to begin yielding.
  47. pool.close()
  48. pool.join()
  49. pool.terminate()
  50. def _map_fallback(func, iterable, chunksize=1):
  51. # type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
  52. """Make an iterator applying func to each element in iterable.
  53. This function is the sequential fallback either on Python 2
  54. where Pool.imap* doesn't react to KeyboardInterrupt
  55. or when sem_open is unavailable.
  56. """
  57. return map(func, iterable)
  58. def _map_multiprocess(func, iterable, chunksize=1):
  59. # type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
  60. """Chop iterable into chunks and submit them to a process pool.
  61. For very long iterables using a large value for chunksize can make
  62. the job complete much faster than using the default value of 1.
  63. Return an unordered iterator of the results.
  64. """
  65. with closing(ProcessPool()) as pool:
  66. return pool.imap_unordered(func, iterable, chunksize)
  67. def _map_multithread(func, iterable, chunksize=1):
  68. # type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
  69. """Chop iterable into chunks and submit them to a thread pool.
  70. For very long iterables using a large value for chunksize can make
  71. the job complete much faster than using the default value of 1.
  72. Return an unordered iterator of the results.
  73. """
  74. with closing(ThreadPool(DEFAULT_POOLSIZE)) as pool:
  75. return pool.imap_unordered(func, iterable, chunksize)
  76. if LACK_SEM_OPEN or PY2:
  77. map_multiprocess = map_multithread = _map_fallback
  78. else:
  79. map_multiprocess = _map_multiprocess
  80. map_multithread = _map_multithread